本文提供使用 TCP 协议下的 Java SDK 收发事务消息的示例代码供您参考。
消息队列提供类似 X/Open XA 的分布式事务功能,通过消息队列事务消息,能达到分布式事务的最终一致。
对于新手用户,建议在正式收发消息前,阅读 Demo 工程来了解搭建消息队列工程的具体步骤。
交互流程
事务消息交互流程如下图所示。
详情请参见 消息类型 > 事务消息。
前提条件
您已完成以下操作:
发送事务消息
具体的示例代码,请以消息队列代码库为准。
发送事务消息包含以下两个步骤:
发送半事务消息(Half Message)及执行本地事务,示例代码如下。
import java.util.Properties; import com.alipay.sofa.sofamq.client.PropertyKeyConst; import io.openmessaging.api.Message; import io.openmessaging.api.MessagingAccessPoint; import io.openmessaging.api.OMS; import io.openmessaging.api.OMSBuiltinKeys; import io.openmessaging.api.transaction.LocalTransactionChecker; import io.openmessaging.api.transaction.LocalTransactionExecuter; import io.openmessaging.api.transaction.TransactionProducer; import io.openmessaging.api.transaction.TransactionStatus; public class TransactionProducerTest { public static void main(String... args) { Properties credentials = new Properties(); // 阿里云账号 AccessKey 拥有所有 API 的访问权限,风险很高。强烈建议您创建并使用 RAM 用户进行 API 访问或日常运维,请登录 RAM 控制台创建 RAM 用户。 // 此处以把 AccessKey 和 AccessKeySecret 保存在环境变量为例说明。 // 强烈建议不要把 AccessKey 和 AccessKeySecret 保存到代码里,会存在密钥泄漏风险 credentials.setProperty(OMSBuiltinKeys.ACCESS_KEY, "SOFA_AK_ENV"); credentials.setProperty(OMSBuiltinKeys.SECRET_KEY, "SOFA_SK_ENV"); // 设置 TCP 接入域名,进入控制台的概览页面查看接入点配置 MessagingAccessPoint accessPoint = OMS.builder().driver("sofamq").endpoint("$endpoint").withCredentials(credentials).build(); Properties properties = new Properties(); // 设置用户实例,进入控制台的概览页面查看接入点配置 properties.setProperty(PropertyKeyConst.INSTANCE_ID, "$instanceId"); properties.setProperty(PropertyKeyConst.GROUP_ID, "YOUR_GROUP"); TransactionProducer producer = accessPoint.createTransactionProducer(properties, newLocalTransactionChecker() { @Override public TransactionStatus check (Message msg){ // check business commit status return TransactionStatus.CommitTransaction; } }); producer.start(); Message message = new Message("$topic", "YOUR_TAG", "hello world".getBytes()); producer.send(message, new LocalTransactionExecuter() { @Override public TransactionStatus execute(Message msg, Object arg) { // if business success, then commit; else rollback return TransactionStatus.CommitTransaction; } }, null); } }
提交事务消息状态。当本地事务执行完成(执行成功或执行失败),需要通知服务器当前消息的事务状态。通知方式有以下两种:
执行本地事务完成后提交。
执行本地事务一直没提交状态,等待服务器回查消息的事务状态。事务状态有以下三种:
TransactionStatus.CommitTransaction
提交事务,允许订阅方消费该消息。TransactionStatus.RollbackTransaction
回滚事务,消息将被丢弃不允许消费。TransactionStatus.Unknow
无法判断状态,期待消息队列的 Broker 向发送方再次询问该消息对应的本地事务的状态。
事务回查机制说明
发送事务消息为什么必须要实现回查 Check 机制?当步骤 1 中半事务消息发送完成,但本地事务返回状态为
TransactionStatus.Unknow
,或者应用退出导致本地事务未提交任何状态时,从 Broker 的角度看,这条 Half 状态的消息的状态是未知的。因此 Broker 会定期要求发送方能 Check 该 Half 状态消息,并上报其最终状态。Check 被回调时,业务逻辑都需要做些什么?事务消息的 Check 方法里面,应该写一些检查事务一致性的逻辑。消息队列发送事务消息时需要实现
LocalTransactionChecker
接口,用来处理 Broker 主动发起的本地事务状态回查请求;因此在事务消息的 Check 方法中,需要完成两件事情:检查该半事务消息对应的本地事务的状态(committed or rollback)。
向 Broker 提交该半事务消息本地事务的状态。
订阅事务消息
事务消息的订阅与普通消息订阅一致,详情请参见 订阅消息。